AWS CDK で SNS トピックにパブリッシュされたメッセージをフィルタリングしてみた
はじめに
プロフィールビューアーサービスProflly(プロフリー)の開発にて、SNS と SQS を使っていわゆる Fanout(参考) の構成で実装していたのですが、このイベントの時は、処理A
、処理B
、処理C
を実施したい、このイベントの時は、処理B
のみ実施したい・・・といった時に、SNS のメッセージフィルタリング機能を活用できそうでしたので、CDK での実装方法を紹介させていただきます。
作成するアーキテクチャ
SNS にパブリッシュされた1つのメッセージが、複数の SQS のキューにプッシュされるような構成となります。
この構成にメッセージによって、連携するキューをフィルタするように設定します。
この構成により、メッセージに応じた(イベントに応じた)処理の実施有無を制御することができます。
環境
- AWS CDK
1.122.0
- TypeScript
3.9.7
実装内容
利用するパッケージをインストール
今回作成するアーキテクチャに必要なパッケージをインストールします。
npm install --save-dev @aws-cdk/aws-sqs @aws-cdk/aws-sns @aws-cdk/aws-sns-subscriptions @aws-cdk/aws-lambda-nodejs @aws-cdk/aws-lambda-event-sources
スタックの実装
スタックの中で各 Construt を定義して、リソースを作成します。
以下のソースのハイライトされている部分がフィルタリングの条件を指定している部分です。
今回の例だと、メッセージ属性 processType
に b
が設定されているメッセージは、sampleQueueB
にメッセージを配信し、processType
が ["a", "b", "c"]
の場合、sampleQueueA
, sampleQueueB
, sampleQueueC
にメッセージを配信する設定となります。
import * as cdk from '@aws-cdk/core'; import {NodejsFunction} from "@aws-cdk/aws-lambda-nodejs"; import {Queue} from "@aws-cdk/aws-sqs"; import {SubscriptionFilter, Topic} from "@aws-cdk/aws-sns"; import {SqsSubscription} from "@aws-cdk/aws-sns-subscriptions"; import {SqsEventSource} from "@aws-cdk/aws-lambda-event-sources"; export class CdkSnsFilterSampleStack extends cdk.Stack { constructor(scope: cdk.Construct, id: string, props?: cdk.StackProps) { super(scope, id, props); // キューA と 処理Aのリソース作成とイベント連携設定 const sampleQueueA = new Queue(this, "sampleQueueA"); const sampleLambdaA = new NodejsFunction(this, "sampleLambdaA", { entry: "src/lambda-function-a.ts", handler: "handler", }); sampleLambdaA.addEventSource(new SqsEventSource(sampleQueueA)); // キューB と 処理Bのリソース作成とイベント連携設定 const sampleQueueB = new Queue(this, "sampleQueueB"); const sampleLambdaB = new NodejsFunction(this, "sampleLambdaB", { entry: "src/lambda-function-b.ts", handler: "handler", }); sampleLambdaB.addEventSource(new SqsEventSource(sampleQueueB)); // キューC と 処理Cのリソース作成とイベント連携設定 const sampleQueueC = new Queue(this, "sampleQueueC"); const sampleLambdaC = new NodejsFunction(this, "sampleLambdaC", { entry: "src/lambda-function-c.ts", handler: "handler", }); sampleLambdaC.addEventSource(new SqsEventSource(sampleQueueC)); // トピックに各キューをサブスクライブ設定 const sampleTopic = new Topic(this, "sampleTopic"); sampleTopic.addSubscription(new SqsSubscription(sampleQueueA, { // メッセージ属性 "processType" が "a" の場合、キューにメッセージを配信する filterPolicy: {processType: SubscriptionFilter.stringFilter({allowlist: ["a"]})} })); sampleTopic.addSubscription(new SqsSubscription(sampleQueueB, { // メッセージ属性 "processType" が "b" の場合、キューにメッセージを配信する filterPolicy: {processType: SubscriptionFilter.stringFilter({allowlist: ["b"]})} })); sampleTopic.addSubscription(new SqsSubscription(sampleQueueC, { // メッセージ属性 "processType" が "c" の場合、キューにメッセージを配信する filterPolicy: {processType: SubscriptionFilter.stringFilter({allowlist: ["c"]})} })); } }
デプロイ後のリソースを確認
以下のコマンドでデプロイを実行し、実行結果を確認してみます。
cdk deploy
SNS トピックとそのトピックをサブスクライブする SQS キューが設定されていることを確認できました。
サブスクリプションの設定を確認すると、サブスクリプションフィルターポリシーも設定されていることを確認できました。
実際にメッセージを配信してみた
マネージメントコンソールから、SNS を開き、該当のトピックを開きます。
その画面の メッセージの発行
ボタンをクリックし、トピックへのメッセージの発行画面へ遷移します。
発行するメッセージの本文になんらかの文字列を入力し、メッセージ属性に、タイプ:String
, 名前:processType
, 値:a
を設定し、メッセージの発行を実行します。
sampleQueueA
の Lambda トリガーに設定されている Lambda のみが実行されていることを確認できました。
今度は、メッセージの本文になんらかの文字列を入力し、メッセージ属性に、タイプ:String.Array
, 名前:processType
, 値:["a", "b", "c"]
を設定し、メッセージの発行を実行します。
各 SQS キューに紐付いている Lambda が実行されていることを確認できました。
さいごに
SNS と SQS を組み合わせて非同期に処理を実行するユースケースは多くあると思います。その際に、こういう条件の時は、処理を実行したくないといった要件を実現する際に、トリガーされる Lambda の処理内でなんらかの条件分岐で処理をスキップすることもできるかと思いますが、よりシンプルに、そもそも SQS のキューにメッセージを配信しない(フィルタする)という実装は、無駄に Lambda も実行されないし、選択肢としていいなぁと思いました。 どなたかの参考になれば幸いです。